Migrate BibliothecaPurchaseMonitor to Celery tasks#3385
Conversation
|
Claude finished @dbernstein's task in 7m 29s —— View job SummaryWell-structured migration with clean task architecture, a thorough test suite (~60 tests), and consistent use of DetailsMinor:
|
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3385 +/- ##
==========================================
+ Coverage 93.37% 93.39% +0.01%
==========================================
Files 503 504 +1
Lines 46126 46158 +32
Branches 6323 6325 +2
==========================================
+ Hits 43070 43107 +37
+ Misses 1980 1973 -7
- Partials 1076 1078 +2 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
f56fc86 to
6a34bf8
Compare
6803451 to
0755fca
Compare
947d402 to
491e857
Compare
Replace the legacy script-driven BibliothecaPurchaseMonitor with a Celery-native purchase_collection task that processes one day of MARC records per invocation and chains days via task.replace() until the collection is caught up. - Add BibliothecaPurchaseImporter (bibliotheca_purchase_importer.py) with get_start/import_day/_purchases/_process_record; uses hash-based bibliographic_apply.delay() instead of BibliothecaBibliographicCoverageProvider - Add purchase_all_collections and purchase_collection Celery tasks with Redis workflow lock (_purchase_workflow_lock, independent key from import_workflow_lock), autoretry for BadResponseException/RequestTimedOut, and replace-per-day chaining - Add daily beat schedule entry (4:00 AM) in celery.py - Add ImportPurchaseCollection script + bin/bibliotheca_purchase_import for manual trigger (mirrors ImportEventCollection from PR 1) - Delete BibliothecaTimelineMonitor, BibliothecaPurchaseMonitor, RunBibliothecaPurchaseMonitorScript from bibliotheca.py and remove now-dead imports - Delete bin/bibliotheca_purchase_monitor (replaced by Celery beat) - Add default_language_version: python3.12 to .pre-commit-config.yaml so check-ast can parse Python 3.12 generic class syntax already present in bibliotheca.py Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Replace %-style log arguments and implicit f-string + bare-string concatenations with f-strings throughout the new purchase monitor files. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Remove TestBibliothecaPurchaseMonitor and TestBibliothecaPurchaseMonitorWhenMultipleCollections from test_bibliotheca.py, along with the now-dead BibliothecaPurchaseMonitor and TimestampData imports - Add return type Iterator[MagicMock] to mock_marc_request in test_bibliotheca_purchase_importer.py, removing the unnecessary type: ignore[no-untyped-def] comment Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Previously import_day fetched all pages for a day in a single loop, allowing a single task to process an unbounded number of records if a day had many purchases. This brings the purchase monitor in line with every other paginated Celery task in the codebase. Changes: - import_day(current_day, cutoff, offset=1) now fetches exactly one API page (up to _MARC_PAGE_SIZE=50 records) and returns DayImportResult.next_offset: set to offset+50 when the page was full (more records remain), or None when the page was partial (day done) - Remove _purchases() — pagination now lives at the task level via task.replace(), consistent with the event-import and Overdrive patterns - Timestamp is checkpointed after every page: finish=current_day while the day is in progress (so a worker crash restarts from this day, not the previous one), finish=day_end when the day completes - purchase_collection gains an offset: int = 1 parameter and handles two replace paths: same day + next offset, or next day + offset reset to 1 - Tests updated: replace test_paginates_marc_request with test_returns_next_offset_when_page_is_full, test_returns_no_next_offset_when_page_is_partial, and test_passes_offset_to_marc_request; add test_replaces_with_next_offset_when_page_full to task tests; tighten test_replaces_when_more_days_remain to assert offset=1 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
All identifiers introduced for the purchase-record sync now carry the word "Record" to make the concept explicit: BibliothecaPurchaseRecordImporter, PURCHASE_RECORD_SERVICE_NAME, import_purchase_records_for_all_collections, import_purchase_records_by_collection, the Redis lock key, beat-schedule entry, bin script, and all test fixtures/classes. Pure mechanical rename; no logic changes. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Cover the five cases that mirror TestImportEventCollection: --import-all, --collection <name>, unknown collection, no args, and both args simultaneously. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Adds a --force-reimport flag to ImportPurchaseRecordCollection and a matching force_reimport parameter to import_purchase_records_for_all_collections. When set, the import starts from DEFAULT_PURCHASE_RECORD_START_TIME (2014-01-01) instead of resuming from the stored Timestamp. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
No functional change; remove the decorative `# ---` block separating the event-import and purchase-record-import sections in both the task module and its test file, as requested in the PR review. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The variable tracks whether lock_value is None (i.e. this is the first invocation of the workflow chain), not whether we are processing the first calendar day. Rename to is_first_invocation for clarity, and update the matching warning log message and test assertion. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Replace the vague observation that "collections typically go back to 2014-01-01" with an explanation of where the value came from and why we are preserving it, as requested in the PR review. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Cover the branch in BibliothecaPurchaseRecordImporter._process_record that skips (and logs an error for) a MARC record that contains more than one 001 field, as flagged in the PR review. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Both BibliothecaEventImporter and BibliothecaPurchaseRecordImporter are Celery tasks, not legacy monitors, so their Timestamp records should carry service_type=TASK_TYPE rather than MONITOR_TYPE. Update both importers and their test fixtures accordingly. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The previous commit updated the source importer but missed test_bibliotheca_importer.py, which was still stamping and looking up Timestamps with MONITOR_TYPE. The mismatch caused two CI failures; this aligns the test fixtures with the updated importer. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Move DEFAULT_PURCHASE_RECORD_START_TIME, _MARC_PAGE_SIZE, and DayImportResult out of inline test-body imports to the top of test_bibliotheca.py. - test_replaces_when_more_days_remain: capture stored_finish and assert the replace current_day equals stored_finish + 1 day, rather than the previous weak 'is not None' check. - test_replaces_with_next_offset_when_page_full: replace the BibliothecaAPI-level mock (which drove 50 records through _process_record's no-control-number error branch) with a direct mock of import_day returning a DayImportResult with next_offset set. - Both test_lock_value_passed_through_on_replace tests: replace isinstance(lock_value, str) with UUID(lock_value) to confirm the generated value is a well-formed UUID. - Add test_prior_timestamp_with_null_finish_returns_default_start: covers the explicit 'if timestamp.finish is None' branch in get_start() that was previously untested. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
The field counted all records returned by the API page — including those silently skipped by _process_record due to a missing or duplicate 001 control-number field. Calling it "handled" (and the achievement string "MARC records processed") implied successful processing, which overstated actual work done and obscured error-log churn from bad records. Renamed to records_fetched and updated the achievement string and task log message to "fetched" to accurately reflect that this is a raw API page count, not a successfully-processed count. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Private helper functions do not need docstrings. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Keep the brief description but drop the verbose explanation of the distinct-key rationale (which was the part flagged in review). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
6da8d36 to
d0717dc
Compare
Description
Replaces the legacy script-driven
BibliothecaPurchaseMonitor(and its helperBibliothecaTimelineMonitor) with a pair of Celery tasks that page through MARC purchase records one day at a time, chaining viatask.replace()until the collection is fully caught up toutc_now().This is PR 2 of 3 in the Bibliotheca → Celery migration series (PR 1 migrated the event monitor; PR 3 will migrate the circulation sweep).
FYI: I create a follow-on ticket to remove dead monitor infrastructure code.
Task architecture
import_purchase_records_for_all_collections(fan-out)import_purchase_records_by_collectiontask per collection.force_reimport=Trueflag (see Manual scripts below).import_purchase_records_by_collection(per-collection worker)[current_day, day_end], whereday_end = min(current_day + 1 day, utc_now()).task.replace():offsetadvanced by 50,current_dayunchanged.current_dayadvanced today_endandoffsetreset to 1.current_day >= utc_now()(collection is up to date).First run vs. subsequent runs
DEFAULT_PURCHASE_RECORD_START_TIME(2014-01-01) when noTimestampexists; otherwiseTimestamp.finishTimestamp.finish(resume from last completed day)Mid-chain invocations receive
current_dayas an explicit parameter, bypassingget_start()entirely so the storedTimestampis never consulted again until the chain finishes.Timestamp checkpointing
Timestamp.finishis updated after every page — not just at the end of the chain — to guard against crashes mid-backfill:finish = current_day. A restart will re-process the current day from offset 1 rather than falling back to a previous day.finish = day_end. The next beat trigger picks up from the next day.Failure modes
BadResponseException/RequestTimedOutignored_exceptions) so a transient API error does not open a window for a second concurrent run.RemoteIntegrationExceptionthrows=— treated as a terminal, non-retriable error. The exception is logged and discarded; Celery does not mark the task as failed. This approach mirrors other celery importer routinesRedis workflow lock
["PurchaseRecordCollectionWorkflow", <collection-redis-key>]lock_valueis generated on the first invocation and forwarded unchanged to every subsequent re-queued task. Becausetask.replace()raisescelery.exceptions.Ignore,Ignoreis listed inignored_exceptionsso the lock is not released when the task replaces itself.MARC record processing
Each MARC record goes through
BibliothecaPurchaseRecordImporter._process_record():001(logs an error and skips if missing or ambiguous — i.e. zero or more than one001field).LicensePoolviaLicensePool.for_foreign_id().api.bibliographic_lookup()and, for each result, checksbibliographic.needs_apply()(hash-based deduplication). Only changed metadata queues abibliographic_applytask on theapplyqueue; unchanged records produce no database writes.Manual scripts
bin/bibliotheca_purchase_record_import(CLI entry point) dispatchesImportPurchaseRecordCollection:--force-reimportpassescurrent_day=DEFAULT_PURCHASE_RECORD_START_TIMEdirectly toimport_purchase_records_by_collection.delay()(or, for--import-all, setsforce_reimport=Trueonimport_purchase_records_for_all_collections.delay()which fans it out to each collection). This completely bypasses the storedTimestamp, restarting the full backfill from 2014-01-01.Motivation and Context
JIRA: (PP-4391)
The three Bibliotheca monitors (event, purchase, circulation sweep) are driven by external cron jobs invoking one-shot scripts backed by a legacy
Monitorbase-class hierarchy. Migrating to Celery gives us:task.replace()to avoid tying up a worker thread for the full duration of a large historical backfillautoretry_forinstead of bespoke try/except loopsHow Has This Been Tested?
~60 tests across five test files — all passing:
test_bibliotheca.py—TestBibliothecaImportPurchaseRecordsForAllCollectionsandTestBibliothecaImportPurchaseRecordsByCollection: first-run default start, stored-timestamp resume, already-up-to-date early exit, full-page replace with advanced offset, partial-page replace with advanced day,force_reimportflag propagation, lock-held skip/warning, lock-expired mid-chain warning, lock forwarded across replaces, lock not released on autoretry, lock independence from event-import lock.test_bibliotheca_purchase_record_importer.py—get_start()with and without prior timestamp;import_day()window bounds, cutoff capping, timestamp stamping (mid-day vs. day-complete), record count, offset forwarding, next-offset logic;_process_record()happy path, missing control number, multiple control numbers,bibliographic_applyqueued/skipped based onneeds_apply().test_bibliotheca_scripts.py— bothImportEventCollectionandImportPurchaseRecordCollection:--import-all,--collection,--force-reimport, unknown collection error, missing args error, conflicting args error.Checklist